//  Copyright (c) 2023-present, Qihoo, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.

#pragma once

#include <chrono>
#include <cstddef>
#include <cstdint>
#include <string>
#include "pika_stream_meta_value.h"
#include "pika_stream_types.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"

namespace storage {

// the max number of each delete operation in XTRIM command，to avoid too much memory usage.
// eg. if a XTIRM command need to trim 10000 items, the implementation will use rocsDB's delete operation (10000 /
// kDEFAULT_TRIM_BATCH_SIZE) times
const static int32_t kDEFAULT_TRIM_BATCH_SIZE = 1000;
struct StreamAddTrimArgs {
  // XADD options
  streamID id;
  bool id_given{false};
  bool seq_given{false};
  bool no_mkstream{false};

  // XADD + XTRIM common options
  StreamTrimStrategy trim_strategy{TRIM_STRATEGY_NONE};
  int trim_strategy_arg_idx{0};

  // TRIM_STRATEGY_MAXLEN options
  uint64_t maxlen{0};
  streamID minid;
};

struct StreamReadGroupReadArgs {
  // XREAD + XREADGROUP common options
  std::vector<std::string> keys;
  std::vector<std::string> unparsed_ids;
  int32_t count{INT32_MAX};  // The limit of read, in redis this is uint64_t, but PKHScanRange only support int32_t
  uint64_t block{0};         // 0 means no block

  // XREADGROUP options
  std::string group_name;
  std::string consumer_name;
  bool noack_{false};
};

struct StreamScanArgs {
  streamID start_sid;
  streamID end_sid;
  int32_t limit{INT32_MAX};
  bool start_ex{false};    // exclude first message
  bool end_ex{false};      // exclude last message
  bool is_reverse{false};  // scan in reverse order
};

struct StreamInfoResult {
  int32_t length{0};
  std::string last_id_str;
  std::string max_deleted_entry_id_str;
  uint64_t entries_added{0};
  std::string first_id_str;
};

class StreamUtils {
 public:
  StreamUtils() = default;
  ~StreamUtils() = default;

  static bool string2uint64(const char* s, uint64_t& value);
  static bool string2int64(const char* s, int64_t& value);
  static bool string2int32(const char* s, int32_t& value);

  static uint64_t GetCurrentTimeMs();

  // serialize the message to a string.
  // format: {field1.size, field1, value1.size, value1, field2.size, field2, ...}
  static bool SerializeMessage(const std::vector<std::string>& field_values, std::string& serialized_message,
                               int field_pos);

  // deserialize the message from a string with the format of SerializeMessage.
  static bool DeserializeMessage(const std::string& message, std::vector<std::string>& parsed_message);

  // Parse a stream ID in the format given by clients to Pika, that is
  // <ms>-<seq>, and converts it into a streamID structure. The ID may be in incomplete
  // form, just stating the milliseconds time part of the stream. In such a case
  // the missing part is set according to the value of 'missing_seq' parameter.
  //
  // The IDs "-" and "+" specify respectively the minimum and maximum IDs
  // that can be represented. If 'strict' is set to 1, "-" and "+" will be
  // treated as an invalid ID.
  //
  // The ID form <ms>-* specifies a millisconds-only ID, leaving the sequence part
  // to be autogenerated. When a non-NULL 'seq_given' argument is provided, this
  // form is accepted and the argument is set to 0 unless the sequence part is
  // specified.
  static bool StreamGenericParseID(const std::string& var, streamID& id, uint64_t missing_seq, bool strict,
                                   bool* seq_given);

  // Wrapper for streamGenericParseID() with 'strict' argument set to
  // 0, to be used when - and + are acceptable IDs.
  static bool StreamParseID(const std::string& var, streamID& id, uint64_t missing_seq);

  // Wrapper for streamGenericParseID() with 'strict' argument set to
  // 1, to be used when we want to return an error if the special IDs + or -
  // are provided.
  static bool StreamParseStrictID(const std::string& var, streamID& id, uint64_t missing_seq, bool* seq_given);

  // Helper for parsing a stream ID that is a range query interval. When the
  // exclude argument is NULL, streamParseID() is called and the interval
  // is treated as close (inclusive). Otherwise, the exclude argument is set if
  // the interval is open (the "(" prefix) and streamParseStrictID() is
  // called in that case.
  static bool StreamParseIntervalId(const std::string& var, streamID& id, bool* exclude, uint64_t missing_seq);
};

struct ScanStreamOptions {
  const rocksdb::Slice key;  // the key of the stream
  uint64_t version;           // the version of the stream
  streamID start_sid;
  streamID end_sid;
  int32_t limit;
  bool start_ex;    // exclude first message
  bool end_ex;      // exclude last message
  bool is_reverse;  // scan in reverse order
  ScanStreamOptions(const rocksdb::Slice skey, uint64_t version, streamID start_sid, streamID end_sid, int32_t count,
                    bool start_ex = false, bool end_ex = false, bool is_reverse = false)
      : key(skey),
        version(version),
        start_sid(start_sid),
        end_sid(end_sid),
        limit(count),
        start_ex(start_ex),
        end_ex(end_ex),
        is_reverse(is_reverse) {}
};
}

